package com.starter.es.service.impl;

import com.alibaba.fastjson.JSON;
import com.starter.es.service.IndexService;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;

@Slf4j
public class IndexServiceImpl implements IndexService {

  private RestHighLevelClient client;

  public IndexServiceImpl(RestHighLevelClient client) {
    this.client = client;
  }

  @Override
  public boolean deleteIndex(String indexName) {
    log.info("deleteIndex——删除一个索引是, indexName:[ {} ] ", indexName);
    try {
      DeleteIndexRequest request = new DeleteIndexRequest(indexName);
      client.indices().delete(request, RequestOptions.DEFAULT);
      return true;
    } catch (IOException exception) {
    } catch (ElasticsearchException exception) {
      if (exception.status() == RestStatus.NOT_FOUND) {
        log.warn("deleteIndex——删除一个索引，索引不存在, indexName:[ {} ] ", indexName);
      }
    }
    return false;
  }

  @Override
  public boolean existIndex(String indexName) {

    log.info("existIndex——判断一个索引是否存在, indexName:[ {} ] ", indexName);

    GetIndexRequest request = new GetIndexRequest(indexName);
    try {
      boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
      return exists;
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    return false;
  }

  @Override
  public void createMapping(String indexName, XContentBuilder mapping) {
    log.info("createMapping_创建索引映射开始, indexName:[ {} ], mapping:[ {} ]", indexName, mapping);
    try {
      CreateIndexRequest index = new CreateIndexRequest(indexName);
      index.source(mapping);
      client.indices().create(index, RequestOptions.DEFAULT);
      log.info("createMapping_创建索引映射成功！！！, indexName:[ {} ], mapping:[ {} ]", indexName, mapping);

    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void indexDoc(String indexName, String indexType, Map<String, Object> doc) {
    log.info("indexDoc_索引一篇文档, indexName:[ {} ], indexType:[ {} ], doc:[ {} ]", indexName,
        indexType, doc);

    IndexRequest indexRequest = new IndexRequest(indexName, indexType, (String) doc.get("key"))
        .source(doc);
    try {
      IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);

      log.info("indexDoc_索引一篇文档成功！！！, indexName:[ {} ], indexType:[ {} ], doc:[ {} ]", indexName,
          indexType, response);

    } catch (ElasticsearchException e) {
      if (e.status() == RestStatus.CONFLICT) {
        System.out.println("写入索引产生冲突" + e.getDetailedMessage());
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void indexDocWithRouting(String indexName, String indexType, String route,
      Map<String, Object> doc) {
    log.info(
        "indexDocWithRouting_带路由索引一篇文档, indexName:[ {} ], indexType:[ {} ], route:[ {} ], doc:[ {} ]",
        indexName, indexType, route, doc);
    IndexRequest indexRequest = new IndexRequest(indexName, indexType, (String) doc.get("key"))
        .source(doc);
    indexRequest.routing(route);
    try {
      IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);

      log.info(
          "indexDocWithRouting_带路由索引一篇文档成功！！！, indexName:[ {} ], indexType:[ {} ], route:[ {} ], doc:[ {} ]",
          indexName, indexType, route, doc);

    } catch (ElasticsearchException e) {
      if (e.status() == RestStatus.CONFLICT) {
        System.out.println("写入索引产生冲突" + e.getDetailedMessage());
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }


  @Override
  public void indexDocs(String indexName, String indexType, List<Map<String, Object>> docs) {
    log.info("indexDocs_索引一组文档, indexName:[ {} ], indexType:[ {} ], docs:[ {} ]", indexName,
        indexType, docs);

    try {
      if (null == docs || docs.size() <= 0) {
        return;
      }
      BulkRequest request = new BulkRequest();
      for (Map<String, Object> doc : docs) {
        request.add(new IndexRequest(indexName, indexType, (String) doc.get("key"))
            .source(doc));
      }
      BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
      if (bulkResponse != null) {
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
          DocWriteResponse itemResponse = bulkItemResponse.getResponse();

          if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
              || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            System.out.println("indexDocs_索引一组文档_新增成功: " + indexResponse.toString());

            log.info(
                "indexDocs_索引一组文档_新增成功！！！, indexName:[ {} ], indexType:[ {} ], indexResponse:[ {} ]",
                indexName, indexType, indexResponse);

          } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
            UpdateResponse updateResponse = (UpdateResponse) itemResponse;
            System.out.println("indexDocs_索引一组文档_修改成功: " + updateResponse.toString());
            log.info(
                "indexDocs_索引一组文档_修改成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]",
                indexName, indexType, updateResponse);

          } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
            DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
            System.out.println("删除成功:" + deleteResponse.toString());
            log.info(
                "indexDocs_索引一组文档_删除成功！！！, indexName:[ {} ], indexType:[ {} ], deleteResponse:[ {} ]",
                indexName, indexType, deleteResponse);
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }


  @Override
  public void indexDocsWithRouting(String indexName, String indexType,
      List<Map<String, Object>> docs) {

    log.info("indexDocsWithRouting_带路由索引一组文档, indexName:[ {} ], indexType:[ {} ], docs:[ {} ]",
        indexName, indexType, docs);

    try {
      if (null == docs || docs.size() <= 0) {
        return;
      }
      BulkRequest request = new BulkRequest();
      for (Map<String, Object> doc : docs) {
        HashMap<String, Object> join = (HashMap<String, Object>) doc.get("joinkey");
        String route = (String) join.get("parent");
        request.add(new IndexRequest(indexName, indexType, (String) doc.get("key"))
            .source(doc).routing(route));
      }
      BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
      if (bulkResponse != null) {
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
          DocWriteResponse itemResponse = bulkItemResponse.getResponse();

          if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
              || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            System.out.println("新增成功" + indexResponse.toString());
            log.info(
                "indexDocsWithRouting_带路由索引一组文档——新增成功！！！, indexName:[ {} ], indexType:[ {} ], indexResponse:[ {} ]",
                indexName, indexResponse);

          } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
            UpdateResponse updateResponse = (UpdateResponse) itemResponse;
            System.out.println("修改成功" + updateResponse.toString());
            log.info(
                "indexDocsWithRouting_带路由索引一组文档——修改成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]",
                indexName, updateResponse);
          } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
            DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
            System.out.println("删除成功" + deleteResponse.toString());
            log.info(
                "indexDocsWithRouting_带路由索引一组文档——删除成功！！！, indexName:[ {} ], indexType:[ {} ], deleteResponse:[ {} ]",
                indexName, deleteResponse);
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  @Override
  public int deleteDoc(String indexName, String indexType, String id) {

    log.info("deleteDoc——删除一篇文档, indexName:[ {} ], indexType:[ {} ], id:[ {} ]", indexName, id);

    DeleteResponse deleteResponse = null;
    DeleteRequest request = new DeleteRequest(indexName, indexType, id);
    try {
      deleteResponse = client.delete(request, RequestOptions.DEFAULT);
      System.out.println("删除成功" + deleteResponse.toString());
      log.info(
          "deleteDoc——删除一篇文档--删除成功！！！, indexName:[ {} ], indexType:[ {} ], id:[ {} ], deleteResponse:[ {} ]",
          indexName, id, deleteResponse);

      if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
        System.out.println("删除失败，文档不存在" + deleteResponse.toString());
        log.error(
            "deleteDoc——删除一篇文档--删除失败，文档不存在！！！！, indexName:[ {} ], indexType:[ {} ], id:[ {} ], deleteResponse:[ {} ]",
            indexName, id, deleteResponse);
        return -1;
      }
    } catch (ElasticsearchException e) {
      if (e.status() == RestStatus.CONFLICT) {
        System.out.println("删除失败，版本号冲突" + deleteResponse.toString());
        log.error(
            "deleteDoc——删除一篇文档--删除失败，删除失败，版本号冲突！！！！, indexName:[ {} ], indexType:[ {} ], id:[ {} ], deleteResponse:[ {} ]",
            indexName, id, deleteResponse);

        return -2;
      }
    } catch (IOException e) {
      e.printStackTrace();
      return -3;
    }
    //
    return 1;
  }

  @Override
  public int deleteDoc(String indexName, String indexType, String filedName, Object value) {
    log.info(
        "deleteDoc——删除一篇文档, indexName:[ {} ], indexType:[ {} ], filedName:[ {} ], value:[ {} ]",
        indexName, filedName, JSON.toJSONString(value));

    BulkByScrollResponse deleteResponse = null;
    try {
      DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
      request.setDocTypes(indexType);
      request.setQuery(new TermQueryBuilder(filedName, value));
      request.setAbortOnVersionConflict(false);

      BulkByScrollResponse resp = client.deleteByQuery(request, RequestOptions.DEFAULT);
      long deleted = resp.getDeleted();
      log.info(
          "deleteDoc——删除一篇文档, indexName:[ {} ], indexType:[ {} ], filedName:[ {} ], value:[ {} ]，删除【 {} 】条文档",
          indexName, filedName, JSON.toJSONString(value), deleted);
    } catch (ElasticsearchException e) {
      if (e.status() == RestStatus.CONFLICT) {
        log.error(
            "deleteDoc——删除一篇文档失败ElasticsearchException, indexName:[ {} ], indexType:[ {} ], filedName:[ {} ], value:[ {} ], deleteResponse:[ {} ]",
            indexName, filedName, JSON.toJSONString(value), JSON.toJSONString(deleteResponse));
        return -2;
      }
    } catch (IOException e) {
      e.printStackTrace();
      log.error(
          "deleteDoc——删除一篇文档失败IOException, indexName:[ {} ], indexType:[ {} ], filedName:[ {} ], value:[ {} ], deleteResponse:[ {} ]",
          indexName, filedName, JSON.toJSONString(value), JSON.toJSONString(deleteResponse));
      return -3;
    }
    return 1;
  }

  @Override
  public void updateDoc(String indexName, String indexType, Map<String, Object> doc) {

    log.info("updateDoc——修改一篇文档, indexName:[ {} ], indexType:[ {} ], doc:[ {} ]", indexName, doc);

    UpdateRequest request = new UpdateRequest(indexName, indexType, (String) doc.get("key"))
        .doc(doc);
    request.docAsUpsert(true);
    try {
      UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
      long version = updateResponse.getVersion();

      if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
        log.info("insert success, version is " + version);
        log.info(
            "updateDoc——修改一篇文档--新增成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]",
            indexName, updateResponse);

      } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
        System.out.println("update success, version is " + version);
        log.info(
            "updateDoc——修改一篇文档--修改成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]",
            indexName, updateResponse);
      } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
        log.info(
            "updateDoc——修改一篇文档--删除成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]",
            indexName, updateResponse);

      } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
        log.warn(
            "updateDoc——修改一篇文档--无操作！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]",
            indexName, updateResponse);
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void BatchupdateDoc(String indexName, String indexType, Map<String, Object> doc) {

    log.info("批量修改一篇文档, indexName:[ {} ], indexType:[ {} ], doc:[ {} ]", indexName, doc);

    UpdateRequest request = new UpdateRequest(indexName, indexType, (String) doc.get("key"))
        .doc(doc);


    request.docAsUpsert(true);
    try {
      UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
      long version = updateResponse.getVersion();

      if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
        log.info("insert success, version is " + version);
        log.info(
            "updateDoc——修改一篇文档--新增成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]",
            indexName, updateResponse);

      } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
        System.out.println("update success, version is " + version);
        log.info(
            "updateDoc——修改一篇文档--修改成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]",
            indexName, updateResponse);
      } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
        log.info(
            "updateDoc——修改一篇文档--删除成功！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]",
            indexName, updateResponse);

      } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
        log.warn(
            "updateDoc——修改一篇文档--无操作！！！, indexName:[ {} ], indexType:[ {} ], updateResponse:[ {} ]",
            indexName, updateResponse);
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
